﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using mqtttLib.Helper;
using mqtttLib.Helper.TcpSocket;
using mqtttLib.Messages;
using mqtttLib.Messages.Base;

namespace mqtttLib
{
    public enum ConnectionState
    {
        /// <summary>
        /// 没有 连接
        /// </summary>
        None,
        /// <summary>
        ///  断开的  
        /// </summary>
        Disconnected,

        /// <summary>
        ///   连接到代理的过程中
        /// </summary>
        Connecting,

        /// <summary>
        ///  连接当前与代理连接
        /// </summary>
        Connected,
    }

    /// <summary>
    /// 订阅model
    /// </summary>
    public class Subscribe
    {
        /// <summary>
        /// 主题
        /// </summary>
        public string Topics { get; set; }
        /// <summary>
        /// 等级
        /// </summary>
        public Qos Qos { get; set; }
        /// <summary>
        /// 状态
        /// </summary>
        public bool State { get; set; }
        /// <summary>
        /// 发生时间
        /// </summary>
        public DateTime SendTime { get; set; }
    }

    /// <summary>
    /// 通信服务
    /// </summary>
    public class MqttClient : HandleClient, IDisposable
    {
        #region 私有字段

        /// <summary>
        /// 通信
        /// </summary>
        private TcpClient _commun;
        /// <summary>
        /// 客户编号
        /// </summary>
        private readonly string _clientId;
        /// <summary>
        /// ip
        /// </summary>
        private readonly string _ip;
        /// <summary>
        /// 端口
        /// </summary>
        private readonly int _port;
        /// <summary>
        /// 日志
        /// </summary>
        private readonly ILog _logManager;
        /// <summary>
        /// 记录日志
        /// </summary>
        /// <param name="title"></param>
        /// <param name="message"></param>
        private void Log(string title, MqttMessage message)
        {
            LogEvent?.Invoke(title, message);
            _logManager?.Info(title, message);
        }
        /// <summary>
        /// 等待发送数据
        /// </summary>
        private readonly Queue<MqttMessage> _sendMessage = new Queue<MqttMessage>();
        /// <summary>
        /// 等待确定 
        /// </summary>
        private readonly LinkedList<DetermineMessage> _determineMessage = new LinkedList<DetermineMessage>();
        /// <summary>
        /// 订阅的集合 为连接失败重新订阅
        /// </summary>
        private readonly Dictionary<string, Subscribe> _subscribeMessage = new Dictionary<string, Subscribe>();
        /// <summary>
        /// 状态
        /// </summary>
        public ConnectionState ConnectionState;
        /// <summary>
        /// 登陆超时
        /// </summary>
        private int _loginTimeout;
        #endregion

        #region 属性



        /// <summary>
        /// 上次心跳返回时间
        /// </summary>
        public DateTime LastTime { get; private set; }
        ///// <summary>
        ///// 通讯状态
        ///// </summary>
        //public bool CommunicationState { get; private set; }
        /// <summary>
        /// 保持连接（超时时间） 
        /// </summary>
        public short KeepAlive { get; set; }

        #endregion


        ///  <summary>
        /// 构造方法
        ///  </summary> 
        ///  <param name="ip"></param>
        ///  <param name="port"></param>
        ///  <param name="keepAlive"></param>
        ///  <param name="clientId"></param>
        /// <param name="log"></param> 
        public MqttClient(string ip, int port, short keepAlive, string clientId, ILog log = null)
        {
            _ip = ip;
            _port = port;
            KeepAlive = keepAlive;
            _clientId = clientId;
            _logManager = log;

            //_commun = new TcpClient(_ip, _port, 1, true);
            //_commun.ReceiveEvent += _commun_ReceiveEvent;
            // Connection();

            //心跳线程
            var heartbeatTask = new Task(() =>
            {
                while (true)
                {
                    Thread.Sleep(1000 * keepAlive);
                    try
                    {
                        //判断是否超时
                        if (LastTime.AddSeconds(KeepAlive * keepAlive) < DateTime.Now)
                        {
                            OnStateUpdate(ConnectionState.Disconnected);//修改连接状态  
                            Connection(); //重新连接  
                        }
                        else
                        {
                            if (ConnectionState == ConnectionState.Connected)
                            {

                            }
                            else
                            {
                                Connection(); //重新连接   
                            }

                            if (ConnectionState == ConnectionState.Connecting)
                            {
                                _loginTimeout++;
                                if (_loginTimeout >= 3)//登陆超时
                                {
                                    OnStateUpdate(ConnectionState.Disconnected);
                                }
                            }

                            //心跳
                            Send(new PingReqMessage());
                            List<Subscribe> temp = _subscribeMessage.Values.Where(item => item.SendTime.AddMilliseconds(1000 * 60) < DateTime.Now && item.State == false).ToList();

                            foreach (var item in temp)
                            {
                                //Subscribe
                                item.State = false;
                                item.SendTime = DateTime.Now;
                                // SendAsync(item);
                                SubscribeMessage message = new SubscribeMessage
                                {
                                    FixedHeader = { Qos = Qos.AtLeastOnce }
                                };
                                message.Subscribe(item.Topics, item.Qos);
                                SendAsync(message);

                            }


                        }
                        // 
                    }
                    catch (Exception ex)
                    {
                        _logManager?.Error(ex);
                    }

                }
                // ReSharper disable once FunctionNeverReturns
            });
            heartbeatTask.Start();
            //发送数据
            var messageSenTask = new Task(() =>
            {
                while (true)
                {
                    try
                    {
                        //发送数据
                        if (_sendMessage.Count > 0)
                        {
                            var message = _sendMessage.Dequeue();
                            Send(message);

                            //发送失败 


                            if (message is MqttMessagePacketId)
                            {
                                message.FixedHeader.Dup = true;
                                _determineMessage.AddLast(new DetermineMessage()
                                {
                                    SendTime = DateTime.Now,
                                    Message = message as MqttMessagePacketId
                                });//加入确认 
                            }

                        }
                        Thread.Sleep(100);
                        //重复发送 
                        if (_determineMessage.Count > 0)
                        {
                            List<DetermineMessage> temp = _determineMessage.Where(item => item.SendTime.AddMilliseconds(1000 * 60) < DateTime.Now).ToList();

                            foreach (var item in temp)
                            {
                                Send(item.Message);
                                _determineMessage.Remove(item);
                                Thread.Sleep(100);
                            }

                        }

                    }
                    catch (Exception ex)
                    {
                        _logManager?.Error(ex);
                    }
                    //   Thread.Sleep(100);
                }
                // ReSharper disable once FunctionNeverReturns
            });
            messageSenTask.Start();
        }

        #region  连接 基础方法


        /// <summary>
        /// 连接
        /// </summary>
        public bool Connection()
        {
            try
            {
                if (_commun == null)
                {
                    _commun = new TcpClient(_ip, _port, 1, true);
                    _commun.ReceiveEvent += _commun_ReceiveEvent;
                }
                //判断是否连接 或者是否在连接中
                if ((ConnectionState != ConnectionState.Connecting && ConnectionState != ConnectionState.Connected))
                {
                    // if (_commun.TryConnect())
                    // {
                    if (Login())
                    {
                        OnStateUpdate(ConnectionState.Connecting);//修改连接状态 

                        Thread.Sleep(1000);
                        //订阅消息
                        foreach (var item in _subscribeMessage)
                        {
                            //消息组包发送
                            SubscribeMessage message = new SubscribeMessage
                            {
                                FixedHeader = { Qos = Qos.AtLeastOnce }
                            };
                            // = new Subscribe() { Topics = topics, Qos = qos, State = false };
                            message.Subscribe(item.Key, item.Value.Qos);
                            //   _commun.Send(message.GetDataBytes());
                            SendAsync(message);

                            item.Value.State = false;
                            item.Value.SendTime = DateTime.Now;
                        }
                        return true;
                    }

                    // }
                }
            }
            catch (Exception ex)
            {
                OnStateUpdate(ConnectionState.Disconnected);//修改连接状态 

                _logManager?.Error(ex);
            }
            return false;
        }
        /// <summary>
        /// 接收
        /// </summary>
        /// <param name="data"></param>
        private void _commun_ReceiveEvent(byte[] data)
        {
            try
            {
                LastTime = DateTime.Now;
                if (data != null)
                {
                    MqttMessage message = Handle(data);
                    Log("接受", message);
                    //删除 大于 q1 获取 q2 的包
                    if (message is MqttMessagePacketId)
                    {
                        var packetId = (message as MqttMessagePacketId).PacketIdentifier;
                        // var tmep = _determineMessage;
                        List<DetermineMessage> tmep = _determineMessage.ToList();
                        foreach (var item in tmep)
                        {
                            if (item.Message.PacketIdentifier == packetId)
                            {
                                //修改订阅状态
                                if (MessageType.SUBSCRIBE != message.FixedHeader.MessageType)
                                {
                                    var s = item.Message as SubscribeMessage;
                                    if (s != null)
                                    {
                                        _subscribeMessage[s.Topics[0].Topic].State = true;
                                    }
                                }
                                _determineMessage.Remove(item);


                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                _logManager?.Error(ex);
            }

        }
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message"></param>
        private bool Send(MqttMessage message)
        {
            //lock (message)
            //{
            bool result = false;
            if (ConnectionState == ConnectionState.Connected)
            {
                result = _commun.Send(message.GetDataBytes());
                if (!result)
                {
                    OnStateUpdate(ConnectionState.Disconnected);//修改连接状态  
                    if (Connection()) //重新连接 重新发送
                        result = Send(message);
                }
                Log(result ? "发送成功" : "发送失败", message);
            }
            else if (ConnectionState == ConnectionState.Connecting)
            {
                if (message is PingReqMessage)
                {
                    result = _commun.Send(message.GetDataBytes());
                    if (!result)
                    {
                        OnStateUpdate(ConnectionState.Disconnected); //修改连接状态 
                    }
                }
            }
            else
            {
                Connection(); //重新连接 
            }
            return result;
            // }
        }
        /// <summary>
        /// 连接修改事件
        /// </summary>
        public event Action<ConnectionState> StateUpdateEvent;
        /// <summary>
        /// 重启事件
        /// </summary>
        public event Action RestartEvent;
        private void OnStateUpdate(ConnectionState state)
        {
            ConnectionState = state;
            StateUpdateEvent?.Invoke(ConnectionState);//连接修改事件 
        }



        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message"></param>
        private bool SendAsync(MqttMessage message)
        {
            _sendMessage.Enqueue(message);
            return true;
        }

        #endregion

        #region 外部调用

        /// <summary>
        /// 客户端 接受数据 事件
        /// </summary>
        public event Action<string, MqttMessage> LogEvent;
        /// <summary>
        /// 接受到的消息
        /// </summary>
        public event Action<string, byte[]> PublishMessageEvent;

        /// <summary>
        /// 登录Mqtt
        /// </summary>
        /// <returns></returns>
        public bool Login()
        {
            ConnectMessage connectMessage = new ConnectMessage(_clientId)
            {
                UserName = "666666",
                Password = "666666",
                CleanSession = true
            };
            return _commun.Send(connectMessage.GetDataBytes());
            //return Send(connectMessage);
        }

        /// <summary>
        /// 推送
        /// </summary>
        /// <returns></returns>
        public bool Push(string topicName, byte[] payload, Qos qos)
        {

            PublishMessage publish = new PublishMessage
            {
                FixedHeader = { Qos = qos },
                //  PacketIdentifier = 123,
                TopicName = topicName,
                Payload = payload
            };
            return SendAsync(publish);
        }
        public bool Push<T>(string topicName, T payload, Qos qos, ISerialize serialize) where T : class//, new()
        {
            if (serialize != null)
            {
                byte[] d = serialize.Serializer(payload);
                PublishMessage publish = new PublishMessage
                {
                    FixedHeader = { Qos = qos },
                    TopicName = topicName,
                    Payload = d
                };

                return SendAsync(publish);
            }
            throw new Exception("_serialize = null");
        }

        /// <summary>
        /// 订阅
        /// </summary>
        /// <returns></returns>
        public bool Subscribe(string topics, Qos qos)
        {
            Subscribe subscribe = new Subscribe() { Topics = topics, Qos = qos, State = false, SendTime = DateTime.Now };
            //加入队列
            if (_subscribeMessage.ContainsKey(topics))
                _subscribeMessage[topics] = subscribe;
            else
                _subscribeMessage.Add(topics, subscribe);

            //消息组包发送
            SubscribeMessage message = new SubscribeMessage
            {
                FixedHeader = { Qos = Qos.AtLeastOnce }
            };
            message.Subscribe(topics, qos);
            return SendAsync(message);

        }
        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topics"></param>
        /// <returns></returns>
        public bool UnSubscribe(string topics)
        {
            UnsubscribeMessage message = new UnsubscribeMessage
            {
                FixedHeader =
                {
                    Qos = Qos.AtLeastOnce
                },
                Topics = new List<string> { topics }
            };
            return SendAsync(message);
        }

        #endregion

        #region 抽象实现



        protected override void ConnectAckMethod(ConnAckMessage message)
        {
            if (message.ReturnCode == MqttConnectReturnCode.ConnectionAccepted)
            {
                OnStateUpdate(ConnectionState.Connected);//修改连接状态 
            }
        }

        protected override void PingResMethod(PingRespMessage message)
        {
            LastTime = DateTime.Now;
        }

        protected override void PublishMethod(PublishMessage message)
        {
            try
            {
                //if (_serialize != null)
                //{
                //    PublishMessageEvent?.Invoke(message.TopicName, _serialize.Deserialize<T>(message.Payload) );
                //}
                //else
                {
                    PublishMessageEvent?.Invoke(message.TopicName, message.Payload);
                }

            }
            catch (Exception ex)
            {
                _logManager.Error(ex);
                // ignored
            }
        }

        protected override void SubscribeAckMethod(SubscribeAckMessage message)
        {
        }

        protected override void UnsubscribeAckMethod(UnsubscribeAckMessage message)
        {
        }

        protected override void PublishAckMethod(PublishAckMessage message)
        {
        }

        protected override void RestartMethod()
        {
            RestartEvent?.Invoke();
            // throw new NotImplementedException();
        }

        #endregion
        public void Dispose()
        {
            _commun?.Stop();
        }

        public void Stop()
        {
            _commun?.Stop();
        }
    }
}
